/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.source.reader.io.transform;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableReference;
import java.io.Serializable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

/**
 * ReaderTransform encloses the tuple tags and the PTransform generated by the reader.
 *
 * <p>The ReaderTransform sits close to the pipeline root and returns two PCollections as a
 * tagged-tuple output:
 *
 * <ol>
 *   <li><code>PCollection&lt;SourceRows&gt;</code> - The Sources rows of all the tables that reader
 *       is reading.
 *   <li><code>PCollection&lt;SourceTableReference&gt;</code> - A reference to tables that have
 *       completed their read
 * </ol>
 *
 * <p>The ReadTransform, via the {@link AccumulatingTableReader} also provides a convenient bridge
 * for the actual Reader to flatten the output of all the tables into one and generate their
 * completions. The Reader has to just initialize the transforms for individual tables (such as the
 * ones provided by {@link org.apache.beam.sdk.io.jdbc.JdbcIO.ReadWithPartitions}, and add them to
 * ReadTransform.
 *
 * <p><b>Example for building by the reader</b> <code><pre>
 * var builder = ReadTransform.builder();
 * tablesToMigrate.forEach(table -> getJdbcIOReadWithPartitions(table));
 * this.readTransform = builder.build();
 * </pre></code>
 *
 * <p><b>Note:</b> Flattening all table outputs into a single collection works well only if the
 * transformer and writer transforms are at the level of a spanner database and not at the level of
 * spanner table. In case there is a need to `groupBy` tables in any of the downstream PTransforms,
 * we should consider a replacement for `AccumulatingTableReader` that provides the output of each
 * table separately in the `PCollectionTuple`.
 */
@AutoValue
public abstract class ReaderTransform implements Serializable {
  public abstract TupleTag<SourceRow> sourceRowTag();

  public abstract TupleTag<SourceTableReference> sourceTableReferenceTag();

  public abstract PTransform<PBegin, PCollectionTuple> readTransform();

  public static Builder builder() {
    TupleTag<SourceRow> sourceRowTupleTag = new TupleTag<>();
    TupleTag<SourceTableReference> sourceTableReferenceTupleTag = new TupleTag<>();

    Builder builder = new AutoValue_ReaderTransform.Builder();
    builder
        .setSourceRowTag(sourceRowTupleTag)
        .setSourceTableReferenceTag(sourceTableReferenceTupleTag);
    builder.readTransformBuilder =
        AccumulatingTableReader.builder(sourceRowTupleTag, sourceTableReferenceTupleTag)
            .setSourceTableReferenceTag(sourceTableReferenceTupleTag)
            .setSourceRowTag(sourceRowTupleTag);
    return builder;
  }

  @AutoValue.Builder
  public abstract static class Builder {

    abstract Builder setSourceRowTag(TupleTag<SourceRow> value);

    abstract Builder setSourceTableReferenceTag(TupleTag<SourceTableReference> value);

    abstract Builder setReadTransform(PTransform<PBegin, PCollectionTuple> value);

    abstract ReaderTransform autoBuild();

    AccumulatingTableReader.Builder readTransformBuilder = null;

    public void withTableReader(
        SourceTableReference sourceTableReference,
        PTransform<PBegin, PCollection<SourceRow>> tableReader) {
      this.readTransformBuilder.withTableReader(sourceTableReference, tableReader);
    }

    public ReaderTransform build() {
      this.setReadTransform(readTransformBuilder.build());
      return autoBuild();
    }
  }
}
